/*-------------------------------------------------------------------------
 *
 * postgres_px.c
 *
 * Copyright (c) 2020, Alibaba Group Holding Limited
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * IDENTIFICATION
 *	  src/backend/tcop/postgres_px.c
 *
 *-------------------------------------------------------------------------
 */

/* POLAR px */

/*
 * exec_px_query
 *
 * Called in a qExec process to read and execute a query plan sent by PxDispatchPlan().
 *
 * query_string -- optional query text (C string).
 * serializedQuerytree[len]  -- Query node or (NULL,0) if plan provided.
 * serializedPlantree[len] -- PlannedStmt node, or (NULL,0) if query provided.
 * serializedParams[len] -- optional parameters
 * serializedQueryDispatchDesc[len] -- QueryDispatchDesc node, or (NULL,0) if query provided.
 *
 * Caller may supply either a Query (representing utility command) or
 * a PlannedStmt (representing a planned DML command), but not both.
 */
static void
exec_px_query(const char *query_string,
			   const char * serializedQuerytree, int serializedQuerytreelen,
			   const char * serializedPlantree, int serializedPlantreelen,
			   const char * serializedParams, int serializedParamslen,
			   const char * serializedQueryDispatchDesc, int serializedQueryDispatchDesclen)
{
	CommandDest dest = whereToSendOutput;
	MemoryContext oldcontext;
	bool		save_log_statement_stats = log_statement_stats;
	bool		was_logged = false;
	char		msec_str[32];
	Node		   *utilityStmt = NULL;
	PlannedStmt	   *plan = NULL;
	QueryDispatchDesc *ddesc = NULL;
	CmdType		commandType = CMD_UNKNOWN;
	SliceTable *sliceTable = NULL;
	ExecSlice  *slice = NULL;
	ParamListInfo paramLI = NULL;

	Assert(px_role == PX_ROLE_PX);

	/*
	 * If we didn't get passed a query string, dummy something up for ps display and pg_stat_activity
	 */
	if (query_string == NULL || strlen(query_string)==0)
		query_string = "polarpx";

	/*
	 * Report query to various monitoring facilities.
	 */

	debug_query_string = query_string;

	pgstat_report_activity(STATE_RUNNING, query_string);

	/*
	 * We use save_log_statement_stats so ShowUsage doesn't report incorrect
	 * results because ResetUsage wasn't called.
	 */
	if (save_log_statement_stats)
		ResetUsage();

	/*
	 * Start up a transaction command.	All queries generated by the
	 * query_string will be in this same command block, *unless* we find a
	 * BEGIN/COMMIT/ABORT statement; we have to force a new xact command after
	 * one of those, else bad things will happen in xact.c. (Note that this
	 * will normally change current memory context.)
	 */
	start_xact_command();

	/*
	 * Zap any pre-existing unnamed statement.	(While not strictly necessary,
	 * it seems best to define simple-Query mode as if it used the unnamed
	 * statement and portal; this ensures we recover any storage used by prior
	 * unnamed operations.)
	 */
	drop_unnamed_stmt();

	/*
	 * Switch to appropriate context for constructing parsetrees.
	 */
	oldcontext = MemoryContextSwitchTo(MessageContext);

	/*
	 * Deserialize the Query node, if there is one.  If this is a planned stmt, then
	 * there isn't one, but there must be a PlannedStmt later on.
	 */
	if (serializedQuerytree != NULL && serializedQuerytreelen > 0)
	{
		Query *query = (Query *) deserializeNode(serializedQuerytree,serializedQuerytreelen);

		if ( !IsA(query, Query) || query->commandType != CMD_UTILITY )
			elog(ERROR, "POLARPX: received non-utility Query node.");

		utilityStmt = query->utilityStmt;
	}

 	/*
     * Deserialize the query execution plan (a PlannedStmt node), if there is one.
     */
	if (serializedPlantree != NULL && serializedPlantreelen > 0)
	{
		plan = (PlannedStmt *) deserializeNode(serializedPlantree,serializedPlantreelen);
		if (!plan || !IsA(plan, PlannedStmt))
			elog(ERROR, "POLARPX: receive invalid planned statement");
    }

	/*
     * Deserialize the extra execution information (a QueryDispatchDesc node), if there is one.
     */
    if (serializedQueryDispatchDesc != NULL && serializedQueryDispatchDesclen > 0)
    {
		ddesc = (QueryDispatchDesc *) deserializeNode(serializedQueryDispatchDesc,serializedQueryDispatchDesclen);
		if (!ddesc || !IsA(ddesc, QueryDispatchDesc))
			elog(ERROR, "POLARPX: received invalid QueryDispatchDesc with planned statement");

        sliceTable = ddesc->sliceTable;

		if (sliceTable)
		{
			int			i;

			if (!IsA(sliceTable, SliceTable) ||
				sliceTable->localSlice < 0 ||
				sliceTable->localSlice >= sliceTable->numSlices)
				elog(ERROR, "POLARPX: received invalid slice table: %d", sliceTable->localSlice);

			/* Identify slice to execute */
			for (i = 0; i < sliceTable->numSlices; i++)
			{
				slice = &sliceTable->slices[i];

				if (bms_is_member(px_identifier, slice->processesMap))
					break;
			}
			if (i == sliceTable->numSlices)
				elog(ERROR, "could not find PX identifier in process map");
			sliceTable->localSlice = slice->sliceIndex;

			/* Set global sliceid variable for elog. */
			currentSliceId = sliceTable->localSlice;

			/* px_log_querydesc(ddesc); */

			get_worker_info_by_identifier(slice, &px_logical_worker_idx, &px_logical_total_workers);
			ic_htab_size = px_logical_total_workers * 5;
			PxIdentity.workerid = px_logical_worker_idx;
			PxIdentity.dbid = px_logical_worker_idx;
		}
    }

	elog((px_enable_print ? LOG : DEBUG1),
		"begin exec px query on node: local address %s, listen port %d, sessid %d, trace_id %ld, px_worker_id %d", 
		ListenAddresses, PostPortNumber, px_session_id, sql_trace_id.uval, 
		px_logical_worker_idx);

	/*
	 * Choose the command type from either the Query or the PlannedStmt.
	 */
    if ( utilityStmt )
    	commandType = CMD_UTILITY;
    else
	/*
	 * Get (possibly 0) parameters.
	 */
    {
    	if ( !plan )
    		elog(ERROR, "POLARPX: received neither Query nor Plan");

    	/* This must be a planned statement. */
	    if (plan->commandType != CMD_SELECT &&
        	plan->commandType != CMD_INSERT &&
        	plan->commandType != CMD_UPDATE &&
        	plan->commandType != CMD_DELETE)
        	elog(ERROR, "POLARPX: received non-DML Plan");

        commandType = plan->commandType;
	}
	if ( slice )
	{
		/* Non root slices don't need update privileges. */
		if (sliceTable->localSlice != slice->rootIndex)
		{
			ListCell       *rtcell;
			RangeTblEntry  *rte;
			AclMode         removeperms = ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_SELECT_FOR_UPDATE;

			/* Just reading, so don't check INS/DEL/UPD permissions. */
			foreach(rtcell, plan->rtable)
			{
				rte = (RangeTblEntry *)lfirst(rtcell);
				if (rte->rtekind == RTE_RELATION &&
					0 != (rte->requiredPerms & removeperms))
					rte->requiredPerms &= ~removeperms;
			}
		}
	}


	if (log_statement != LOGSTMT_NONE)
	{
		/*
		 * TODO need to log SELECT INTO as DDL
		 */
		if (log_statement == LOGSTMT_ALL ||
			(utilityStmt && log_statement == LOGSTMT_DDL) ||
			(plan && log_statement >= LOGSTMT_MOD))

		{
			ereport(LOG, (errmsg("statement: %s", query_string)
						   ));
			was_logged = true;
		}

	}

	/*
	 * Get (possibly 0) parameters.
	 */
	if (serializedParams != NULL && serializedParamslen > 0)
		paramLI = deserializeParamListInfo(serializedParams, serializedParamslen);
	else
		paramLI = NULL;

	/*
	 * Switch back to transaction context to enter the loop.
	 */
	MemoryContextSwitchTo(oldcontext);

	/*
	 * All unpacked and checked.  Process the command.
	 */
	{
		const char *commandTag;
		char		completionTag[COMPLETION_TAG_BUFSIZE];

		Portal		portal;
		DestReceiver *receiver;
		int16		format;
		Snapshot	snapshot = InvalidSnapshot;

		if (pxsn_get_serialized_snapshot_data())
			snapshot = RestoreSnapshot(pxsn_get_serialized_snapshot_data());

		/*
		 * Get the command name for use in status display (it also becomes the
		 * default completion tag, down inside PortalRun).	Set ps_status and
		 * do any special start-of-SQL-command processing needed by the
		 * destination.
		 */
		if (commandType == CMD_UTILITY)
			commandTag = "POLARPX UTILITY";
		else if (commandType == CMD_SELECT)
			commandTag = "POLARPX SELECT";
		else if (commandType == CMD_INSERT)
			commandTag = "POLARPX INSERT";
		else if (commandType == CMD_UPDATE)
			commandTag = "POLARPX UPDATE";
		else if (commandType == CMD_DELETE)
			commandTag = "POLARPX DELETE";
		else
			commandTag = "POLARPX";


		set_ps_display(commandTag, false);

		BeginCommand(commandTag, dest);

		/*
		 * If we are in an aborted transaction, reject all commands except
		 * COMMIT/ABORT.  It is important that this test occur before we try
		 * to do parse analysis, rewrite, or planning, since all those phases
		 * try to do database accesses, which may fail in abort state. (It
		 * might be safe to allow some additional utility commands in this
		 * state, but not many...)
		 */
		if (IsAbortedTransactionBlockState() /*&&*/
			/*!IsTransactionExitStmt(parsetree)*/)
			ereport(ERROR,
					(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
					 errmsg("current transaction is aborted, "
							"commands ignored until end of transaction block")));

		/* Make sure we are in a transaction command */
		start_xact_command();

		/* If we got a cancel signal in parsing or prior command, quit */
		CHECK_FOR_INTERRUPTS();

		/*
		 * OK to analyze, rewrite, and plan this query.
		 *
		 * Switch to appropriate context for constructing querytrees (again,
		 * these must outlive the execution context).
		 */
		oldcontext = MemoryContextSwitchTo(MessageContext);

		/* If we got a cancel signal in analysis or planning, quit */
		CHECK_FOR_INTERRUPTS();

		/*
		 * Create unnamed portal to run the query or queries in. If there
		 * already is one, silently drop it.
		 */
		portal = CreatePortal("", true, true);
		/* Don't display the portal in pg_cursors */
		portal->visible = false;

		/*
		 * We don't have to copy anything into the portal, because everything
		 * we are passing here is in MessageContext, which will outlive the
		 * portal anyway.
		 */
		PortalDefineQuery(portal,
						  NULL,
						  query_string,
						  T_Query, /* not a parsed statement, so not T_SelectStmt */
						  commandTag,
						  list_make1(plan ? (Node*)plan : (Node*)utilityStmt),
						  NULL);

		/*
		 * Start the portal.
		 */
		PortalStart(portal, 
					paramLI, 
					0, 
					snapshot, 
					ddesc/* POLAR px */
					);

		/*
		 * Select text output format, the default.
		 */
		format = 0;
		PortalSetResultFormat(portal, 1, &format);

		/*
		 * Now we can create the destination receiver object.
		 */
		receiver = CreateDestReceiver(dest);
		if (dest == DestRemote)
			SetRemoteDestReceiverParams(receiver, portal);

		/*
		 * Switch back to transaction context for execution.
		 */
		MemoryContextSwitchTo(oldcontext);

		/*
		 * Run the portal to completion, and then drop it (and the receiver).
		 */
		(void) PortalRun(portal,
						 FETCH_ALL,
						 true, /* Effectively always top level. */
						 true,
						 receiver,
						 receiver,
						 completionTag);

		(*receiver->rDestroy) (receiver);

		PortalDrop(portal, false);

		/*
		 * Close down transaction statement before reporting command-complete.
		 * This is so that any end-of-transaction errors are reported before
		 * the command-complete message is issued, to avoid confusing
		 * clients who will expect either a command-complete message or an
		 * error, not one and then the other.
		 */
		finish_xact_command();

		/*
		 * Tell client that we're done with this query.  Note we emit exactly
		 * one EndCommand report for each raw parsetree, thus one for each SQL
		 * command the client sent, regardless of rewriting. (But a command
		 * aborted by error will not send an EndCommand report at all.)
		 */
		EndCommand(completionTag, dest);
	}							/* end loop over parsetrees */

	/*
	 * Close down transaction statement, if one is open.
	 */
	finish_xact_command();

	/*
	 * Emit duration logging if appropriate.
	 */
	switch (check_log_duration(msec_str, was_logged))
	{
		case 1:
			ereport(LOG,
					(errmsg("duration: %s ms", msec_str),
					 errhidestmt(true)));
			break;
		case 2:
			ereport(LOG,
					(errmsg("duration: %s ms  statement: %s",
							msec_str, query_string),
					 errhidestmt(true)));
			break;
	}

	if (save_log_statement_stats)
		ShowUsage("QUERY STATISTICS");

	debug_query_string = NULL;
}
